From 69fab536d5cac10fd0ed5287e01d99e197e871d5 Mon Sep 17 00:00:00 2001 From: Tim Starling Date: Sun, 19 Oct 2008 04:41:28 +0000 Subject: [PATCH] Some initial work on external storage recompression. Committing now in case I accidentally destroy it in a fit of refactoring. --- maintenance/storage/blob_tracking.sql | 36 +++++ maintenance/storage/trackBlobs.php | 200 ++++++++++++++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 maintenance/storage/blob_tracking.sql create mode 100644 maintenance/storage/trackBlobs.php diff --git a/maintenance/storage/blob_tracking.sql b/maintenance/storage/blob_tracking.sql new file mode 100644 index 0000000000..119bd3b863 --- /dev/null +++ b/maintenance/storage/blob_tracking.sql @@ -0,0 +1,36 @@ + +-- Table for tracking blobs prior to recompression or similar maintenance operations + +CREATE TABLE /*$wgDBprefix*/blob_tracking ( + -- page.page_id + -- This may be zero for orphan or deleted text + bt_page integer not null, + + -- revision.rev_id + -- This may be zero for orphan or deleted text + bt_rev_id integer not null, + + -- text.old_id + bt_text_id integer not null, + + -- The ES cluster + bt_cluster varbinary(255), + + -- The ES blob ID + bt_blob_id integer not null, + + -- The CGZ content hash, or null + bt_cgz_hash varbinary(255), + + PRIMARY KEY (bt_rev_id, bt_text_id), + + -- Sort by page for easy CGZ recompression + KEY (bt_page, bt_rev_id), + + -- For fast orphan searches + KEY (bt_text_id), + + -- Key for determining the revisions using a given blob + KEY (bt_cluster, bt_blob_id, bt_cgz_hash) +) /*$wgDBTableOptions*/; + diff --git a/maintenance/storage/trackBlobs.php b/maintenance/storage/trackBlobs.php new file mode 100644 index 0000000000..36eb731bbb --- /dev/null +++ b/maintenance/storage/trackBlobs.php @@ -0,0 +1,200 @@ + [... ]\n"; + echo "Adds blobs from a given ES cluster to the blob_tracking table\n"; + exit( 1 ); +} + +trackBlobs( $args ); + +function trackBlobs( $clusters ) { + initTrackingTable(); + trackRevisions( $clusters ); + trackOrphans( $clusters ); +} + +function initTrackingTable() { + $dbw = wfGetDB( DB_MASTER ); + if ( !$dbw->tableExists( 'blob_tracking' ) ) { + $dbw->sourceFile( dirname( __FILE__ ) . '/blob_tracking.sql' ); + } +} + +function getTextClause( $clusters ) { + $dbr = wfGetDB( DB_SLAVE ); + $textClause = ''; + foreach ( $clusters as $cluster ) { + if ( $textClause != '' ) { + $textClause .= ' OR '; + } + $textClause .= 'old_text LIKE ' . $dbr->addQuotes( $dbr->escapeLike( "DB://$cluster/" ) . '%' ); + } + return $textClause; +} + +function interpretPointer( $text ) { + if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) { + return false; + } + return array( + 'cluster' => $m[1], + 'id' => intval( $m[2] ), + 'hash' => isset( $m[3] ) ? $m[2] : null + ); +} + +/** + * Scan the revision table for rows stored in the specified clusters + */ +function trackRevisions( $clusters ) { + $dbw = wfGetDB( DB_MASTER ); + $dbr = wfGetDB( DB_SLAVE ); + $batchSize = 10; + $reportingInterval = 10; + + $textClause = getTextClause( $clusters ); + $startId = 0; + $endId = $dbr->selectField( 'revision', 'MAX(rev_id)', false, __METHOD__ ); + $batchesDone = 0; + $rowsInserted = 0; + + echo "Finding revisions...\n"; + + while ( true ) { + $res = $dbr->select( array( 'revision', 'text' ), + array( 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ), + array( + 'rev_id > ' . $dbr->addQuotes( $startId ), + 'rev_text_id=old_id', + $textClause, + "old_flags LIKE '%external%'", + ), + __METHOD__, + array( + 'ORDER BY' => 'rev_id', + 'LIMIT' => $batchSize + ) + ); + if ( !$res->numRows() ) { + break; + } + + $insertBatch = array(); + foreach ( $res as $row ) { + $startId = $row->rev_id; + $info = interpretPointer( $row->old_text ); + if ( !$info ) { + echo "Invalid DB:// URL in rev_id {$row->rev_id}\n"; + continue; + } + if ( !in_array( $info['cluster'], $clusters ) ) { + echo "Invalid cluster returned in SQL query: {$info['cluster']}\n"; + continue; + } + $insertBatch[] = array( + 'bt_page' => $row->rev_page, + 'bt_rev_id' => $row->rev_id, + 'bt_text_id' => $row->old_id, + 'bt_cluster' => $info['cluster'], + 'bt_blob_id' => $info['id'], + 'bt_cgz_hash' => $info['hash'] + ); + } + $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ ); + $rowsInserted += count( $insertBatch ); + + ++$batchesDone; + if ( $batchesDone >= $reportingInterval ) { + $batchesDone = 0; + echo "$startId / $endId\n"; + wfWaitForSlaves( 5 ); + } + } + echo "Found $rowsInserted revisions\n"; +} + +/** + * Scan the text table for orphan text + */ +function trackOrphans( $clusters ) { + # Wait until the blob_tracking table is available in the slave + $dbw = wfGetDB( DB_MASTER ); + $dbr = wfGetDB( DB_SLAVE ); + $pos = $dbw->getMasterPos(); + $dbr->masterPosWait( $pos, 100000 ); + + $batchSize = 10; + $reportingInterval = 10; + + $textClause = getTextClause( $clusters ); + $startId = 0; + $endId = $dbr->selectField( 'text', 'MAX(old_id)', false, __METHOD__ ); + $rowsInserted = 0; + $batchesDone = 0; + + echo "Finding orphan text...\n"; + + # Scan the text table for orphan text + while ( true ) { + $res = $dbr->select( array( 'text', 'blob_tracking' ), + array( 'old_id', 'old_flags', 'old_text' ), + array( + 'old_id>' . $dbr->addQuotes( $startId ), + $textClause, + "old_flags LIKE '%external%'", + 'bt_text_id IS NULL' + ), + __METHOD__, + array( + 'ORDER BY' => 'old_id', + 'LIMIT' => $batchSize + ), + array( 'blob_tracking' => array( 'LEFT JOIN', 'bt_text_id=old_id' ) ) + ); + $ids = array(); + foreach ( $res as $row ) { + $ids[] = $row->old_id; + } + + if ( !$res->numRows() ) { + break; + } + + $insertBatch = array(); + foreach ( $res as $row ) { + $startId = $row->old_id; + $info = interpretPointer( $row->old_text ); + if ( !$info ) { + echo "Invalid DB:// URL in old_id {$row->old_id}\n"; + continue; + } + if ( !in_array( $info['cluster'], $clusters ) ) { + echo "Invalid cluster returned in SQL query\n"; + continue; + } + + $insertBatch[] = array( + 'bt_page' => 0, + 'bt_rev_id' => 0, + 'bt_text_id' => $row->old_id, + 'bt_cluster' => $info['cluster'], + 'bt_blob_id' => $info['id'], + 'bt_cgz_hash' => $info['hash'] + ); + } + $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ ); + + $rowsInserted += count( $insertBatch ); + ++$batchesDone; + if ( $batchesDone >= $reportingInterval ) { + $batchesDone = 0; + echo "$startId / $endId\n"; + wfWaitForSlaves( 5 ); + } + } + echo "Found $rowsInserted orphan text rows\n"; +} -- 2.20.1